package main

import (
	"context"
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/Shopify/sarama"
)

var (
	topic = "text"
	bs    = []string{"127.0.0.1:9092"}
	wg    sync.WaitGroup
	stop  chan bool
)

func producerSync() {
	c := sarama.NewConfig()
	c.Version = sarama.V2_0_0_0
	c.Producer.RequiredAcks = sarama.WaitForLocal
	c.Producer.Retry.Max = 3
	c.Producer.Return.Successes = true

	p, err := sarama.NewSyncProducer(bs, c)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10; i++ {
		_, _, err := p.SendMessage(&sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(fmt.Sprintf("Message: %d", i)),
		})
		if err != nil {
			log.Println("Failed to write log entry:", err)
		}
		log.Println("SendMessage:", i)
		time.Sleep(time.Second)
	}
}

func producerAsync() {
	c := sarama.NewConfig()
	c.Version = sarama.V2_0_0_0
	c.Producer.RequiredAcks = sarama.WaitForLocal
	c.Producer.Compression = sarama.CompressionSnappy
	c.Producer.Flush.Frequency = 500 * time.Millisecond

	p, err := sarama.NewAsyncProducer(bs, c)
	if err != nil {
		panic(err)
	}

	go func() {
		for {
			select {
			case err := <-p.Errors():
				log.Println("Failed to write log entry:", err)
			case <-p.Successes():
				log.Println("Success to write log entry:")
			}
		}
	}()

	for i := 0; i < 10; i++ {
		p.Input() <- &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(fmt.Sprintf("Message: %d", i)),
		}
		time.Sleep(time.Second)
	}
}

func consumer(index int, group string) {
	version, err := sarama.ParseKafkaVersion("2.4.1")
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	c := sarama.NewConfig()
	c.Version = version
	c.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

	con := &Consumer{group: group, index: index, ready: make(chan bool)}
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		select {
		case <-stop:
			cancel()
		}
	}()

	clt, err := sarama.NewConsumerGroup(bs, group, c)
	if err != nil {
		panic(err)
	}

	go func() {
		defer wg.Done()
		for {
			err := clt.Consume(ctx, strings.Split(topic, ","), con)
			if err != nil {
				log.Panicf("Error from consumer: %v", err)
			}

			if ctx.Err != nil {
				break
			}
		}
		clt.Close()
	}()

	// Await till the consumer has been set up
	<-con.ready
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	index int
	group string
	ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(consumer.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: group = %s, index = %d, value = %s, timestamp = %v, topic = %s",
			consumer.group, consumer.index, string(message.Value), message.Timestamp, message.Topic)
		session.MarkMessage(message, "")
	}
	return nil
}

func main() {
	log.Println("producer sync")

	{
		stop = make(chan bool)
		n := 1
		wg.Add(n)
		for i := 0; i < n; i++ {
			consumer(i, "group1")
		}

		wg.Add(n)
		for i := 0; i < n; i++ {
			consumer(i, "group2")
		}

		producerSync()

		close(stop)
		wg.Wait()
	}

	log.Println("producer async")

	{
		stop = make(chan bool)
		n := 1
		wg.Add(n)
		for i := 0; i < n; i++ {
			consumer(i, "group1")
		}

		wg.Add(n)
		for i := 0; i < n; i++ {
			consumer(i, "group2")
		}

		producerAsync()

		close(stop)
		wg.Wait()
	}
}
